package cn.doitedu.rtdw.data_etl;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @Author: deep as the sea
 * @Site: <a href="www.51doit.com">多易教育</a>
 * @QQ: 657270652
 * @Date: 2023/02/05
 * @Tips: 学大数据，到多易教育
 * @Desc:
 *   用户行为事件概况实时统计，10分钟级别轻度聚合计算任务
 **/
public class E04_EtlJob_EventsOverviewAgg {

    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointStorage("file:/d:/checkpoint");
        env.setParallelism(1);

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        // 创建 kafka连接器 source表： 事件公共维度打宽表
        tEnv.executeSql(
                " CREATE TABLE mall_events_commondim_kfksource(          "
                        + "     user_id           INT,                            "
                        + "     username          string,                         "
                        + "     session_id        string,                         "
                        + "     event_id          string,                         "
                        + "     event_time        bigint,                         "
                        + "     lat               double,                         "
                        + "     lng               double,                         "
                        + "     release_channel   string,                         "
                        + "     device_type       string,                         "
                        + "     properties        map<string,string>,             "
                        + "     register_phone    STRING,                         "
                        + "     user_status       INT,                            "
                        + "     register_time     TIMESTAMP(3),                   "
                        + "     register_gender   INT,                            "
                        + "     register_birthday DATE, register_province STRING, "
                        + "     register_city STRING, register_job STRING, register_source_type INT,   "
                        + "     gps_province   STRING, gps_city STRING, gps_region STRING,             "
                        + "     page_type   STRING, page_service STRING,         "
                        + "     rt as to_timestamp_ltz(event_time,3) ,           "
                        + "     watermark for  rt as rt - interval '0' seconds   "  // 水位线watermark声明，因为后续需要进行时间窗口统计
                        + " ) WITH (                                             "
                        + "  'connector' = 'kafka',                              "
                        + "  'topic' = 'mall-events-wide',                       "
                        + "  'properties.bootstrap.servers' = 'doitedu:9092',    "
                        + "  'properties.group.id' = 'testGroup',                "
                        + "  'scan.startup.mode' = 'latest-offset',            "
                        + "  'value.format'='json',                              "
                        + "  'value.json.fail-on-missing-field'='false',         "
                        + "  'value.fields-include' = 'EXCEPT_KEY')              ");

        // 创建doris连接器sink表： 10分钟级别事件次数聚合表
        tEnv.executeSql(
                " create table evts_ov_agg_dorissink(    "
                        +"    dt  DATE,                           "
                        +"    time_60m          STRING,           "
                        +"    time_30m          STRING,           "
                        +"    time_10m          STRING,           "
                        +"    page_service      STRING,           "
                        +"    page_type         STRING,           "
                        +"    release_channel   STRING,           "
                        +"    device_type       STRING,           "
                        +"    event_id          STRING,           "
                        +"    user_id           INT,              "
                        +"    act_count         BIGINT            "
                        +" ) WITH (                               "
                        +"    'connector' = 'doris',              "
                        +"    'fenodes' = 'doitedu:8030',         "
                        +"    'table.identifier' = 'dws.mall_evts_overview_agg',  "
                        +"    'username' = 'root',                "
                        +"    'password' = '',                    "
                        +"    'sink.label-prefix' = 'doris_tl"+System.currentTimeMillis()+"')"
        );


        // 对事件数据按照 10分钟滚动窗口聚合，并将聚合结果写入doris目标表
        tEnv.createTemporaryFunction("time_round", E02_EtlJob_TrafficMinuteAgg.TimeRound2.class);
        tEnv.executeSql(
                " INSERT INTO evts_ov_agg_dorissink                                           "
                        +" WITH tmp AS (                                                               "
                        +" SELECT                                                                      "
                        +"    to_date(date_format(to_timestamp_ltz(event_time,3),'yyyy-MM-dd')) as dt, "
                        +"    time_round(event_time,60)  as time_60m,                                  "
                        +"    time_round(event_time,30)  as time_30m,                                  "
                        +"    time_round(event_time,10)  as time_10m,                                  "
                        +"    user_id,                                                                 "
                        +"    page_service,                                                            "
                        +"    page_type,                                                               "
                        +"    release_channel,                                                         "
                        +"    device_type,                                                             "
                        +"    event_id,                                                                "
                        +"    rt                                                                       "
                        +" FROM  mall_events_commondim_kfksource                                       "
                        +" WHERE event_id not in ('page_load','put_back','wake_up')                    "
                        +" )                                                                           "
                        +"                                                                             "
                        +" SELECT                                                                      "
                        +"   dt,time_60m,time_30m,time_10m,                                            "
                        +"   page_service,page_type,release_channel,                                   "
                        +"   device_type,event_id,user_id,                                             "
                        +"   count(1) as act_count                                                     "
                        +"                                                                             "
                        +" FROM TABLE(                                                                 "
                        +"  TUMBLE(TABLE tmp, DESCRIPTOR(rt),interval '10' minutes)                    "
                        +" )                                                                           "
                        +" GROUP BY                                                                    "
                        +"   window_start,                                                             "
                        +"   window_end,                                                               "
                        +"   dt,time_60m,time_30m,time_10m,user_id,                                    "
                        +"   page_service,page_type,release_channel,device_type,event_id               "
        );

    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class EventsOvBean{
        private String dt;
        private String event_id;
        private Integer user_id;
        private String time_10m;
        private String page_service;
        private String page_type;
        private String release_channel;
        private String device_type;
        private Integer act_count;
    }
}
